Redis高级客户端Lettuce详解

您所在的位置:网站首页 sync 4 Redis高级客户端Lettuce详解

Redis高级客户端Lettuce详解

#Redis高级客户端Lettuce详解| 来源: 网络整理| 查看: 265

前提

Lettuce是一个Redis的Java驱动包,初识她的时候是使用RedisTemplate的时候遇到点问题Debug到底层的一些源码,发现spring-data-redis的驱动包在某个版本之后替换为Lettuce。Lettuce翻译为生菜,没错,就是吃的那种生菜,所以它的Logo长这样:

既然能被Spring生态所认可,Lettuce想必有过人之处,于是笔者花时间阅读她的官方文档,整理测试示例,写下这篇文章。编写本文时所使用的版本为Lettuce 5.1.8.RELEASE,SpringBoot 2.1.8.RELEASE,JDK [8,11]。超长警告:这篇文章断断续续花了两周完成,超过4万字.....

Lettuce简介

Lettuce是一个高性能基于Java编写的Redis驱动框架,底层集成了Project Reactor提供天然的反应式编程,通信框架集成了Netty使用了非阻塞IO,5.x版本之后融合了JDK1.8的异步编程特性,在保证高性能的同时提供了十分丰富易用的API,5.1版本的新特性如下:

支持Redis的新增命令ZPOPMIN, ZPOPMAX, BZPOPMIN, BZPOPMAX。 支持通过Brave模块跟踪Redis命令执行。 支持Redis Streams。 支持异步的主从连接。 支持异步连接池。 新增命令最多执行一次模式(禁止自动重连)。 全局命令超时设置(对异步和反应式命令也有效)。 ......等等

注意一点:Redis的版本至少需要2.6,当然越高越好,API的兼容性比较强大。

只需要引入单个依赖就可以开始愉快地使用Lettuce:

Maven io.lettuce lettuce-core 5.1.8.RELEASE Gradle dependencies { compile 'io.lettuce:lettuce-core:5.1.8.RELEASE' } 连接Redis

单机、哨兵、集群模式下连接Redis需要一个统一的标准去表示连接的细节信息,在Lettuce中这个统一的标准是RedisURI。可以通过三种方式构造一个RedisURI实例:

定制的字符串URI语法: RedisURI uri = RedisURI.create("redis://localhost/"); 使用建造器(RedisURI.Builder): RedisURI uri = RedisURI.builder().withHost("localhost").withPort(6379).build(); 直接通过构造函数实例化: RedisURI uri = new RedisURI("localhost", 6379, 60, TimeUnit.SECONDS); 定制的连接URI语法 单机(前缀为redis://) 格式:redis://[password@]host[:port][/databaseNumber][?[timeout=timeout[d|h|m|s|ms|us|ns]] 完整:redis://[email protected]:6379/0?timeout=10s 简单:redis://localhost 单机并且使用SSL(前缀为rediss://) log.info("Set命令返回:{}", value)); // Future#get() future.get(); } // Set命令返回:OK @Test public void testAsyncSetAndGet2() throws Exception { SetArgs setArgs = SetArgs.Builder.nx().ex(5); CompletableFuture result = (CompletableFuture) ASYNC_COMMAND.set("name", "throwable", setArgs) .thenAcceptBoth(ASYNC_COMMAND.get("name"), (s, g) -> { log.info("Set命令返回:{}", s); log.info("Get命令返回:{}", g); }); result.get(); } // Set命令返回:OK // Get命令返回:throwable

如果能熟练使用CompletableFuture和函数式编程技巧,可以组合多个RedisFuture完成一些列复杂的操作。

反应式API

Lettuce引入的反应式编程框架是Project Reactor,如果没有反应式编程经验可以先自行了解一下Project Reactor。

构建RedisReactiveCommands实例:

private static RedisReactiveCommands REACTIVE_COMMAND; @BeforeClass public static void beforeClass() { REACTIVE_COMMAND = CONNECTION.reactive(); }

根据Project Reactor,RedisReactiveCommands的方法如果返回的结果只包含0或1个元素,那么返回值类型是Mono,如果返回的结果包含0到N(N大于0)个元素,那么返回值是Flux。举个例子:

@Test public void testReactivePing() throws Exception { Mono ping = REACTIVE_COMMAND.ping(); ping.subscribe(v -> log.info("Ping result:{}", v)); Thread.sleep(1000); } // Ping result:PONG @Test public void testReactiveSetAndGet() throws Exception { SetArgs setArgs = SetArgs.Builder.nx().ex(5); REACTIVE_COMMAND.set("name", "throwable", setArgs).block(); REACTIVE_COMMAND.get("name").subscribe(value -> log.info("Get命令返回:{}", value)); Thread.sleep(1000); } // Get命令返回:throwable @Test public void testReactiveSet() throws Exception { REACTIVE_COMMAND.sadd("food", "bread", "meat", "fish").block(); Flux flux = REACTIVE_COMMAND.smembers("food"); flux.subscribe(log::info); REACTIVE_COMMAND.srem("food", "bread", "meat", "fish").block(); Thread.sleep(1000); } // meat // bread // fish

举个更加复杂的例子,包含了事务、函数转换等:

@Test public void testReactiveFunctional() throws Exception { REACTIVE_COMMAND.multi().doOnSuccess(r -> { REACTIVE_COMMAND.set("counter", "1").doOnNext(log::info).subscribe(); REACTIVE_COMMAND.incr("counter").doOnNext(c -> log.info(String.valueOf(c))).subscribe(); }).flatMap(s -> REACTIVE_COMMAND.exec()) .doOnNext(transactionResult -> log.info("Discarded:{}", transactionResult.wasDiscarded())) .subscribe(); Thread.sleep(1000); } // OK // 2 // Discarded:false

这个方法开启一个事务,先把counter设置为1,再将counter自增1。

发布和订阅

非集群模式下的发布订阅依赖于定制的连接StatefulRedisPubSubConnection,集群模式下的发布订阅依赖于定制的连接StatefulRedisClusterPubSubConnection,两者分别来源于RedisClient#connectPubSub()系列方法和RedisClusterClient#connectPubSub():

非集群模式: // 可能是单机、普通主从、哨兵等非集群模式的客户端 RedisClient client = ... StatefulRedisPubSubConnection connection = client.connectPubSub(); connection.addListener(new RedisPubSubListener() { ... }); // 同步命令 RedisPubSubCommands sync = connection.sync(); sync.subscribe("channel"); // 异步命令 RedisPubSubAsyncCommands async = connection.async(); RedisFuture future = async.subscribe("channel"); // 反应式命令 RedisPubSubReactiveCommands reactive = connection.reactive(); reactive.subscribe("channel").subscribe(); reactive.observeChannels().doOnNext(patternMessage -> {...}).subscribe() 集群模式: // 使用方式其实和非集群模式基本一致 RedisClusterClient clusterClient = ... StatefulRedisClusterPubSubConnection connection = clusterClient.connectPubSub(); connection.addListener(new RedisPubSubListener() { ... }); RedisPubSubCommands sync = connection.sync(); sync.subscribe("channel"); // ...

这里用单机同步命令的模式举一个Redis键空间通知(Redis Keyspace Notifications)的例子:

@Test public void testSyncKeyspaceNotification() throws Exception { RedisURI redisUri = RedisURI.builder() .withHost("localhost") .withPort(6379) // 注意这里只能是0号库 .withDatabase(0) .withTimeout(Duration.of(10, ChronoUnit.SECONDS)) .build(); RedisClient redisClient = RedisClient.create(redisUri); StatefulRedisConnection redisConnection = redisClient.connect(); RedisCommands redisCommands = redisConnection.sync(); // 只接收键过期的事件 redisCommands.configSet("notify-keyspace-events", "Ex"); StatefulRedisPubSubConnection connection = redisClient.connectPubSub(); connection.addListener(new RedisPubSubAdapter() { @Override public void psubscribed(String pattern, long count) { log.info("pattern:{},count:{}", pattern, count); } @Override public void message(String pattern, String channel, String message) { log.info("pattern:{},channel:{},message:{}", pattern, channel, message); } }); RedisPubSubCommands commands = connection.sync(); commands.psubscribe("__keyevent@0__:expired"); redisCommands.setex("name", 2, "throwable"); Thread.sleep(10000); redisConnection.close(); connection.close(); redisClient.shutdown(); } // pattern:__keyevent@0__:expired,count:1 // pattern:__keyevent@0__:expired,channel:__keyevent@0__:expired,message:name

实际上,在实现RedisPubSubListener的时候可以单独抽离,尽量不要设计成匿名内部类的形式。

事务和批量命令执行

事务相关的命令就是WATCH、UNWATCH、EXEC、MULTI和DISCARD,在RedisCommands系列接口中有对应的方法。举个例子:

// 同步模式 @Test public void testSyncMulti() throws Exception { COMMAND.multi(); COMMAND.setex("name-1", 2, "throwable"); COMMAND.setex("name-2", 2, "doge"); TransactionResult result = COMMAND.exec(); int index = 0; for (Object r : result) { log.info("Result-{}:{}", index, r); index++; } } // Result-0:OK // Result-1:OK

Redis的Pipeline也就是管道机制可以理解为把多个命令打包在一次请求发送到Redis服务端,然后Redis服务端把所有的响应结果打包好一次性返回,从而节省不必要的网络资源(最主要是减少网络请求次数)。Redis对于Pipeline机制如何实现并没有明确的规定,也没有提供特殊的命令支持Pipeline机制。Jedis中底层采用BIO(阻塞IO)通讯,所以它的做法是客户端缓存将要发送的命令,最后需要触发然后同步发送一个巨大的命令列表包,再接收和解析一个巨大的响应列表包。Pipeline在Lettuce中对使用者是透明的,由于底层的通讯框架是Netty,所以网络通讯层面的优化Lettuce不需要过多干预,换言之可以这样理解:Netty帮Lettuce从底层实现了Redis的Pipeline机制。但是,Lettuce的异步API也提供了手动Flush的方法:

@Test public void testAsyncManualFlush() { // 取消自动flush ASYNC_COMMAND.setAutoFlushCommands(false); List


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3